深入理解Vert.x Core(1)-Context

Context

Vert.x核心组件概览一文中已经谈到了Context的基本概念,在更加深入地了解VertxImpl,Verticle的内部原理等等之前,有必要了解Context的一些内部机制。本文将对Vert.x的Context进行深入分析。

先来看下Context的结构

ShowImage

最顶层的是Context接口,下面是ContextInternal接口,这个接口是提供给Vert.x内部使用的API,ContextImpl是一个抽象类,包含了Context的基本属性以及方法,再往下层就是Context的实现类,其实看到源码可以知道Vert.x中一共有四种Context,但是这个BenchmarkContext的目的不是给用户使用的,它主要被用来对Vert.x一些关键部分进行性能测试,比如Json的编解码、HTTP头部编码、HTTP Handler处理HttpRequest和HttpResponse的速度等等…可以参考这里。就是说,真正在Vert.x中应用的Context就三种。三种Context使用的场景在这里就不再介绍,可以参考对应Verticle的用途

EventLoopContext

我们先对EventLoopContext进行分析。

ShowImage

isEventLoopContext(),isMultiThreadedWorkerContext()

EventLoopContext本身只有几个方法,isEventLoopContext()方法和isMultiThreadedWorkerContext()方法就不用多说了,继承ContextImpl中的抽象方法,表明自己身份用的。

checkCorrectThread()

先看下checkCorrectThread方法,这个是实现了ContextImpl类中抽象方法的方法,会去判断当前线程是不是Vertx线程,如果是Vertx线程还会判断context的线程和当前线程是不是相同线程,看代码

1
2
3
4
5
6
7
8
9
@Override
protected void checkCorrectThread() {
Thread current = Thread.currentThread();
if (!(current instanceof VertxThread)) {
throw new IllegalStateException("Expected to be on Vert.x thread, but actually on: " + current);
} else if (contextThread != null && current != contextThread) {
throw new IllegalStateException("Event delivered on unexpected thread " + current + " expected: " + contextThread);
}
}

executeAsync()

接着看executeAsync()方法,在抽象类ContextImpl中也声明了executeAsync()抽象方法

1
protected abstract void executeAsync(Handler<Void> task);

找到实际调用这个方法的地方

1
2
3
4
5
6
7
8
9
// Run the task asynchronously on this same context
@Override
public void runOnContext(Handler<Void> task) {
try {
executeAsync(task);
} catch (RejectedExecutionException ignore) {
// Pool is already shut down
}
}

ContextImpl这个类中多次用到了模板方法的设计模式,例如上面的executeAsync()checkCorrectThread()都用到了,将抽象方法写在指定逻辑中,然后交给子类去实现这些抽象方法。

runOnContext()方法是定义在Context接口中的,用来指明当前context中异步地执行指定的操作,这个方法在很多地方都用到了,举例你在Context关联的线程之外想去执行一个action,但是这个action并不是在你当前所在的Vertx线程控制的范围内执行(例如你new Thread去执行),现在你想要的是这个action会改变一些原来Vertx线程内的状态,如果想让这些改变的状态在这个context thread之中可见,就必须使用runOnContext()方法去执行。

executeAsync()方法中的逻辑

1
2
3
4
public void executeAsync(Handler<Void> task) {
// No metrics, we are on the event loop.
nettyEventLoop().execute(wrapTask(null, task, true, null));
}
ContextImpl#nettyEventLoop()

nettyEventLoop()方法在ContextImpl类中,返回持有的EventLoop实例,而这个EventLoop instance除非自己指定,否则是使用在ContextImpl的Constructor中传入的VertxInternal对象获取的,通过这个vertx对象的getEventLoopGroup()方法拿到EventLoopGroup,接着从Netty的EventLoopGroup中通过next()方法选出一个EventLoop作为返回值,注意这里的EventLoopGroupEventLoop是Netty中的概念。

1
2
3
4
5
6
7
8
private static EventLoop getEventLoop(VertxInternal vertx) {
EventLoopGroup group = vertx.getEventLoopGroup();
if (group != null) {
return group.next();
} else {
return null;
}
}

这段是《Netty in Action》中提到的Channel,EventLoop,Thread以及EventLoopGroup之间的关系:

  • 一个EventLoopGroup包含一个或者多个EventLoop

  • 一个EventLoop在它的生命周期内之和一个Thread关联

  • 所有由EventLoop处理的I/O事件都将在它专有的Thread上被处理

  • 一个Channel在它的生命周期内只会注册到一个EventLoop

  • 一个EventLoop可能会指派到一个或多个Channel

    在这种设计中,一个给定Channel的I/O操作都是由相同的Thread执行的,实际上消除了对于同步的需要。

这样就从netty的EventLoopGroup中拿到了EventLoop,接下来就是用wrapTask()方法将Vert.x中的要处理的event封装成Runnable接口提交给Netty的EventLoop了。

注:提交的EventLoop不仅仅是NioEventLoop, Vert.x在3.5.0版本加入了对Linux上Epoll和OS X上Kqueue的Native Transport支持,Netty中的异步传输可以不再只是使用NIO了。关于Netty中EventLoop为什么使用execute()方法接收Runnable接口,可以去找一找相关的EventLoop和JUC中Executor的资料。

我们对wrapTask()方法进行简化,省去一些Metrics管理,时间记录,检查操作的逻辑以及错误处理,得到的源码是这样的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected Runnable wrapTask(ContextTask cTask, Handler<Void> hTask, boolean checkThread, PoolMetrics metrics) {
return () -> {
Thread th = Thread.currentThread();
VertxThread current = (VertxThread) th;
contextThread = current;
try {
setContext(current, ContextImpl.this);
if (cTask != null) {
cTask.run();
} else {
hTask.handle(null);
}
} catch (Throwable t) {
// handle t
} finally {
// clean up
}
};
}

可以看到简化后的wrapTask()方法并没有太复杂的操作,仅仅将executeAsync(Handler<Void> task)方法传入的参数task封装成一个Runnable,做一些基本的检查线程的操作,交给hTask.handle(null)处理就行了。需要注意的只有一点,就是这个Runnable执行前会将它执行时的线程(必须是Vertx线程)与你调用runOnContext()方法的context关联。到现在可以确认的是,我们已经知道从Vert.x EventLoopContext上拿到的Task已经能够在Netty的EventLoop上面跑了。

Constructor

EventLoopContext的构造器调用的是父类的构造器,现在我们将目光转向ContextImpl类。

ContextImpl

ContextImpl这个抽象类,内容非常的多。
ShowImage

Properties

按照顺序逐个分析,Logger不必说,先看六个常量,分别对应三组Key-Value,String常量表示Key,Boolean常量表示Value。THREAD_CHECKS_PROP_NAME对应THREAD_CHECKS,负责进行线程检查的开关;DISABLE_TIMINGS_PROP_NAME对应DISABLE_TIMINGS,负责进行时间计量的开关;DISABLE_TCCL_PROP_NAME对应DISABLE_TCCL,负责TCCL线程上下文类加载器的检测的开关。

VertxInternal

owner变量很直观,就是对当前的Vertx引用,之前讲过一个Vert.x实例会有多个Eventloops,而每个EventLoop同一时间只会关联一个Context,反过来想一个Context也就对应一个Vert.x实例。这里类型是VertxInternal,主要面向内部使用。

deploymentID, config

然后是deploymentIDconfig两个变量,这两个变量不复杂,找到相关联的地方就知道是给Verticle使用的属性,其中deploymentID通过generateDeploymentID()这个方法生成,

1
2
3
private String generateDeploymentID() {
return UUID.randomUUID().toString();
}

在部署Verticle时候作为返回结果,而config就是部署Verticle时DeploymentOptions这个对象所包含的config属性的拷贝。前面也讲过了每个Verticle实例都会关联到唯一的一个Context,如果你看一眼AbstractVerticle里面的deploymentID()config()方法,你会发现它们都调用的是所关联的context的对应方法。

closeHooks

closeHooks提供了提醒自动清理的功能,在Verticle或者Vertx实例关闭的时候,上面跑的组件如果有closeHooks就可以实现自动清理。这个功能主要通过观察者模式实现,有兴趣可以结合源码并参考这篇文章看看。
https://github.com/vietj/vertx-materials/blob/master/src/main/asciidoc/Close_hooks.adoc

tccl

Context所在线程的Thread Context ClassLoader,通过构造器和set方法进行设置。

eventLoop

Context对应的Netty的EventLoop实例,之前在讲EventLoopContext时候已经提过了。

contextThread

当前Context所关联的VertxThread实例,在ContextImpl类主要被用来做一些检查操作。

contextData

这个官方文档说的非常明白,contextData就是让你可以在同一个Context的handlers之间共享任意类型的数据,通过put(),get(),remove()方法进行操作,数据结构就是ConcurrentHashMap

获取contextData对象的代码

1
2
3
4
5
6
public synchronized ConcurrentMap<Object, Object> contextData() {
if (contextData == null) {
contextData = new ConcurrentHashMap<>();
}
return contextData;
}

exceptionHandler

为Context注册一个exceptionHandler,如果Context执行的action抛出异常自己无法捕获时,就会使用这个Context本身的exceptionHandler。
可以自己显式地调用exceptionHandler(Handler<Throwable> handler)方法去给context注册,还可以通过关联的VertxInternal实例中的exceptionHandler注册。
还是ContextImpl中的wrapTask()方法,看下怎么处理异常的,把逻辑简化下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected Runnable wrapTask(ContextTask cTask, Handler<Void> hTask, boolean checkThread, PoolMetrics metrics) {
// metrics
return () -> {
// some checks...
try {
// logic
} catch (Throwable t) {
log.error("Unhandled exception", t);
Handler<Throwable> handler = this.exceptionHandler;
if (handler == null) {
handler = owner.exceptionHandler();
}
if (handler != null) {
handler.handle(t);
}
} finally {
// clean up
}
};
}

如果try块中action自己处理不了异常,那么就先交给context的exceptionHandler来处理。如果这个exceptionHandler为空,就会使用关联的owner的exceptionHandler处理。

deployment

Vert.x中verticle的deploy与undeploy操作都是交给DeploymentManager或者HAManager去处理的,Context中deployment对象就是一个DeploymentImpl实例,DeploymentImpl类是DeploymentManager类中的一个内部类,实现了Deployment接口,任务就是负责维护与Context相关联的Verticle的状态。ContextImplgetInstanceCount()方法就是调用deployment实例中deploymentOptions()方法来获取verticle instance数量。

processArgs()方法

从Launcher命令行获取到的参数,如果没有使用Launcher(拿到参数为null)那么就返回Starter的参数。AbstractVerticle#processArgs()方法调用的也是context的processArgs()方法。

注:Starter已经Deprecated,一般都会使用Launcher。

setContext()静态方法

将调用时所在的当前线程与指定Context关联起来。

executeFromIO()方法

先看代码

1
2
3
4
5
6
public void executeFromIO(ContextTask task) {
if (THREAD_CHECKS) {
checkCorrectThread();
}
wrapTask(task, null, true, null).run();
}

这个方法接受一个ContextTask参数,ContextTask是一个函数接口。经过线程检查以后,执行的是wrapTask(task, null, true, null).run();。这个方法当第一个参数不为空时,调用的是cTask.run()而不是hTask.handle(null),也就是说直接在当前线程上执行该task。executeFromIO()方法什么时候去调用它呢?就是当你需要在IO环境中去执行一些代码,比如说你需要在Netty代码中的ChannelFuture去处理一些Vert.x的handler回调逻辑时,这个时候就必须有一个对vert.x context的引用,然后用这个context的executeFromIO()方法去处理你的Vert.x API的逻辑。

多余的一些字段和方法

isOnWorkerThread()
isOnEventLoopThread()
isOnVertxThread()
isWorkerContext()
isEventLoopContext()
isMultiThreadedWorkerContext()
这些方法都能从字面上理解,源码也不复杂,这里就省略了

还有一些非常重要的Stuff

在谈WorkerContext之前,还需要更加深入ContextImpl,前面讲EventLoopContext的时候说的比较简略,现在我们dive deeper一些。

Constructor

先回到ContextImpl类的构造器方法

1
2
3
4
protected ContextImpl(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, String deploymentID, JsonObject config,
ClassLoader tccl) {
this(vertx, getEventLoop(vertx), internalBlockingPool, workerPool, deploymentID, config, tccl);
}

getEventLoop()这个方法之前已经讲过了,我们将目光转向这两个字段internalBlockingPoolworkerPool。查看类型发现它们都是WorkerPool,WorkerPool就是包装了一个ExecutorServicePoolMetrics的类,那么它们两个Pool之间有什么区别呢?首先来看一看internalBlockingPool的工作目的,这个pool是给Vert.x内部进行的阻塞操作使用,比如你使用FileSystem的一些操作或者AsyncFile的flush操作时,Vert.x会将这些阻塞的操作交给internalBlockingPool而不是workerPool去处理。与之相对的workerPool就是提供给用户来进行阻塞操作的pool。

TaskQueue

ContextImpl中还有两个TaskQueue,这个类的目的很明确

A task queue that always run all tasks in order.

ShowImage

剖析一下TaskQueue的结构,首先TaskQueue有一个LinkedList<Task>列表去维护当前TaskQueue队列中需要被执行的任务Task,其中Task是一个自定义的静态内部类,以及一个Executor引用表示当前的Executor,还有一个Runnable引用。TaskQueue的构造方法很简单,就是直接让自己的Runnable引用runner指向自己run方法。

1
2
3
public TaskQueue() {
runner = this::run;
}

调用队列的execute()方法可以执行一个新的任务

1
2
3
4
5
6
7
8
9
public void execute(Runnable task, Executor executor) {
synchronized (tasks) {
tasks.add(new Task(task, executor));
if (current == null) {
current = executor;
executor.execute(runner);
}
}
}

先对队列进行同步操作,将当前task封装成Task对象加入tasks队列。判断当前任务是否执行完了,如果之前的任务执行完了才去执行当前这个任务,否则execute()只会将任务加到队列,然后退出循环,这样就保证了同一个队列TaskQueue内的任务是一个个执行的,如果任务发送过多而前面没有处理完,就会出现任务堆积。

接着是run方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void run() {
for (; ; ) {
final Task task;
synchronized (tasks) {
task = tasks.poll();
if (task == null) {
current = null;
return;
}
if (task.exec != current) {
tasks.addFirst(task);
task.exec.execute(runner);
current = task.exec;
return;
}
}
try {
task.runnable.run();
} catch (Throwable t) {
log.error("Caught unexpected Throwable", t);
}
}
};

先进入死循环,声明当前即将被执行的task, 接着对tasks进行同步避免冲突操作,通过LinkedListpoll()从队列头取出一个任务交给引用task
这个时候会有一些条件的判断,如果取出的是null,说明队列中没有任务需要执行了,这个时候将current置为null,并且退出循环,这样下次进入循环的时候就能执行任务了。
如果取到任务后发现当前取到的任务关联的executor不是当前的TaskQueue中的current executor,那么将这个任务重新退回队列第一个,并且将runner加入与这个任务相关联的executor(执行会在稍后发生,这里不一定会立即执行),并且将TaskQueue的current置为该executor,这个过程做完就退出循环。
如果之前两个条件都满足了,说明这个任务现在是可以执行的,那么直接使用task.runnable.run();执行阻塞操作。这里用一个流程图来看更直观一点。

ShowImage

现在我们已经知道如何通过TaskQueue来保障阻塞操作执行的顺序。接下来看executeBlocking()方法

executeBlocking()方法

ContextImpl类中一共有5个executeBlocking()重载方法,一个一个来看他们的作用。
先看第一个方法代码

1
2
3
4
// Execute an internal task on the internal blocking ordered executor
public <T> void executeBlocking(Action<T> action, Handler<AsyncResult<T>> resultHandler) {
executeBlocking(action, null, resultHandler, internalBlockingPool.executor(), internalOrderedTasks, internalBlockingPool.metrics());
}

这个代码已经有一行注释了,上面清楚地写着是按顺序执行一个internal的阻塞task。将当前context的internalBlockingPool executor与internal队列传入重载的executeBlocking()方法,现在来看这个重载方法的代码,所有的逻辑都是在这里完成的,省去Metrics和Timing部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<T> void executeBlocking(Action<T> action, Handler<Future<T>> blockingCodeHandler,
Handler<AsyncResult<T>> resultHandler,
Executor exec, TaskQueue queue, PoolMetrics metrics) {
try {
Runnable command = () -> {
Future<T> res = Future.future();
try {
if (blockingCodeHandler != null) {
ContextImpl.setContext(this);
blockingCodeHandler.handle(res);
} else {
T result = action.perform();
res.complete(result);
}
} catch (Throwable e) {
res.fail(e);
} finally {
// omit this
}
}
if (resultHandler != null) {
runOnContext(v -> res.setHandler(resultHandler));
}
};
if (queue != null) {
queue.execute(command, exec);
} else {
exec.execute(command);
}
} catch (RejectedExecutionException e) {
// omit this
}
}

首先声明一个Runnable对象command对即将进行的阻塞操作进行封装,由于传入blockingCodeHandler为null,因此直接执行action,将结果告知给前面定义的Future中并且设置好resultHandler回调,最后就是将这个command交给internal队列去处理了,queue.execute(command, exec);使得在交给队列之前将这个任务与当前context下的internalBlockingPoolexecutor关联。

接下来就是这个重载方法

1
2
3
4
@Override
public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler) {
executeBlocking(blockingCodeHandler, true, resultHandler);
}

这个方法就是有序直接执行阻塞任务。继续下一层重载方法

1
2
3
4
@Override
public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<T>> resultHandler) {
executeBlocking(null, blockingCodeHandler, resultHandler, workerPool.executor(), ordered ? orderedTasks : null, workerPool.metrics());
}

这个方法会去指定执行的executor是workerPool中的executor,并且如果有序的话就用orderedTasks去保证执行任务的顺序,继续回到上面实现真正逻辑的重载方法。
这次blockingCodeHandler不为空,那么就将当前所在线程的context设置为正在用的context,并且将异步操作的结果通过future与resultHandler回调hook上。如果有序就用taskQueue保证任务的执行顺序,无序就直接丢给对应的executor执行。

还有最后一个方法是自己指定TaskQueue的重载方法,逻辑跟有序执行基本差不多,只不过TaskQueue是自己指定的,这里就省略了。

在理清了ContextImpl的一切之后,接下来看WorkerContextMultiThreadedWorkerContext就没有那么复杂了。

WorkerContext

isEventLoopContext()方法与isMultiThreadedWorkerContext()方法当然都返回false。接下来就是executeAsync()方法

1
2
3
4
@Override
public void executeAsync(Handler<Void> task) {
orderedTasks.execute(wrapTask(null, task, true, workerPool.metrics()), workerPool.executor());
}

可以看到仅仅是将task封装成Runnable对象然后提交到TaskQueue中去执行。

MultiThreadedWorkerContext

MultiThreadedWorkerContext继承了WorkerContext,不同之处就是isMultiThreadedWorkerContext()方法与executeAsync()方法进行了重写。
前者不用说,直接看executeAsync()

1
2
3
4
@Override
public void executeAsync(Handler<Void> task) {
workerPool.executor().execute(wrapTask(null, task, false, workerPool.metrics()));
}

MultiThreadedWorkerContext中,executeAsync()方法会将task直接交给workerPool的executorService去处理。

小结

  1. 一个线程可能会被多个Context所使用,但是一个Context只可能同时在一个线程执行,不可能出现一个Context同时关联多个线程的情况。

  2. 调用Vert.x的API时候,如果当前线程是Vert.x线程,那么此时就会复用这个Vert.x线程已经关联好的Context,如果不是就会创建一个新的Context。

  3. 一个Verticle 部署以后会关联一个新的Context,并且只对应这个Context,Context类型会由DeploymentOptions决定, 就是之前讲的三种类型。所有在Verticle里面执行的handler都会用Verticle关联的唯一的Context。另外,如果在Standard Verticle里面调用executeBlocking()方法执行阻塞代码,尽管任务被丢给worker pool里面的一个线程去执行,但Context这个时候还是EventLoopContext类型。

------ 本文结束 ------

版权声明


BillyYccc's blog by Billy Yuan is licensed under a Creative Commons BY-NC-SA 4.0 International License.
本文原创于BillyYccc's Blog,转载请注明原作者及出处!